package xyz.scootaloo.kami.server.service.impl

import io.vertx.core.Future
import io.vertx.core.json.JsonObject
import io.vertx.kotlin.coroutines.await
import xyz.scootaloo.kami.server.controller.dto.SC
import xyz.scootaloo.kami.server.controller.dto.StatusCode
import xyz.scootaloo.kami.server.model.ActualUploadFileInfo
import xyz.scootaloo.kami.server.model.SysTask
import xyz.scootaloo.kami.server.model.UploadFileResponse
import xyz.scootaloo.kami.server.model.UploadTaskInfo
import xyz.scootaloo.kami.server.model.dao.TaskDAO
import xyz.scootaloo.kami.server.standard.*
import xyz.scootaloo.kami.server.standard.SyncMark
import xyz.scootaloo.kami.server.service.*
import xyz.scootaloo.kami.server.service.UploadService.UploadFileChunk
import java.util.*
import kotlin.math.ceil
import kotlin.math.min

/**
 * @author flutterdash@qq.com
 * @since 2022/2/24 23:32
 */
object InternalUploadServiceImpl : UploadService, FileUploadHelper, ApplicationStageListener {

    private val log by lazy { getLogger() }

    private val fileViewService = FileSystemViewService()
    private val fileResolver = FileResolver()
    private val systemService = SystemService()
    private val tokenService = TokenService()
    private val cacheService = CacheService()
    private val crontabService = CrontabService()
    private val fileLogService = FileLogService()

    override suspend fun createUploadTask(
        uploader: Int, relativePath: String, files: List<ActualUploadFileInfo>
    ): UploadFileResponse {
        val task = UploadTaskManager.createNewTask(uploader, relativePath, files)
        logUploadTaskInfo(task)
        return UploadFileResponse(
            true, "", task.taskId, null, files
        )
    }

    override suspend fun handleUploadFileChunk(chunkInfo: UploadFileChunk): StatusCode {
        val taskInfo = getUploadTaskById(chunkInfo.taskId) ?: throw taskNotExists(chunkInfo.taskId)
        val subTaskInfo = taskInfo.files[chunkInfo.filename] ?: throw subTaskNotExists(chunkInfo.filename)

        val (continueProcess, offset) = FileChunkedUtil.verifyChunkInfo(chunkInfo, subTaskInfo)
        if (!continueProcess) {
            chunkInfo.uploadFile.asyncDelete()
            return SC.FILE_CHUNK_HAS_DUPLICATED_SUBMIT
        }

        val targetFilepath = "${taskInfo.uploadPath}/${subTaskInfo.tmpFilename}"
        val absoluteTargetPath = fileResolver.realPathString(targetFilepath)
        chunkInfo.uploadFile.transferTo(absoluteTargetPath, offset)
        UploadTaskManager.updateSubTaskProgress(
            taskInfo, subTaskInfo, chunkInfo.chunkNum
        ).await() // 等待进度更新完成
        return SC.FILE_CHUNK_TRANSFER_SUCCESS
    }

    override suspend fun findSubTaskInfo(taskId: String, filename: String): ActualUploadFileInfo {
        val taskInfo = getUploadTaskById(taskId) ?: throw taskNotExists(taskId)
        return taskInfo.files[filename] ?: throw subTaskNotExists(filename)
    }

    override suspend fun findExistsTaskInfo(taskId: String): UploadFileResponse {
        val taskInfo = getUploadTaskById(taskId) ?: throw taskNotExists(taskId)
        return UploadFileResponse(
            true, "", taskId, null, taskInfo.files.values.toList()
        )
    }

    override fun afterApplicationStarted() {
        crontabService.submit(UploadTaskCrontab)
    }

    override fun handle(dbTask: SysTask) {
        UploadTaskManager.createTaskFromDatabaseRecord(dbTask)
    }

    private suspend fun getUploadTaskById(taskId: String): UploadTaskInfo? {
        return UploadTaskManager.getTaskFromCache(taskId).await()
    }

    private fun logUploadTaskInfo(task: UploadTaskInfo) {
        val taskId = task.taskId
        val fileCount = task.files.size
        val totalSize = task.files.map { it.value.filesize }.sum()
        val readableSize = fileResolver.humanReadableSize(totalSize)
        log.info("创建上传任务:`$taskId`, 上传文件数量:$fileCount, 总大小:$readableSize")
    }

    private fun taskNotExists(taskId: String) = UploadTaskNotExistsException(taskId)
    private fun subTaskNotExists(filename: String) = FileNotInTaskException(filename)

    object FileChunkedUtil {
        /* 文件大小与分块的关系
         *
         *   filesize   | chunkSize | chunkCount
         * -------------+-----------+-----------
         *     0 ~ 5M   |    32Kb   |   0 ~ 160
         *    5M ~ 100M |   256Kb   |  20 ~ 400
         *  100M ~ 2G   |     2M    |  50 ~ 1024
         *    2G ~ 25G  |     4M    | 512 ~ 6400
         */
        private const val defFileChunkSize = (1024 * 32L) // 32 Kb
        private const val midFileChunkSize = (1024 * 256L) // 256 Kb
        private const val big1FileChunkSize = (1024 * 1024 * 2L) // 2 Mb
        private const val big2FileChunkSize = (1024 * 1024 * 8L) // 8 Mb

        private const val smallFileLimit = (1024 * 1024 * 5L) // 5 Mb
        private const val mediumFileLimit = (1024 * 1024 * 100L) // 100 Mb
        private const val bigFileLimit = (1024 * 1024 * 1024 * 2L) // 2 G

        fun handleFileChunked(task: UploadTaskInfo) {
            for ((_, fileInfo) in task.files) {
                val (perChunkSize, chunkCount) = calculateSuitableChunkedSize(fileInfo.filesize)
                fileInfo.chunkedSize = perChunkSize
                fileInfo.chunkCount = chunkCount
                fileInfo.updateProgress()
            }
        }

        // 返回这个分段文件是否需要继续处理
        fun verifyChunkInfo(
            chunkInfo: UploadFileChunk, actualFileInfo: ActualUploadFileInfo
        ): Pair<Boolean, Long> {
            // 检查上传文件块的编号是否在预期范围内
            val realChunkCount = actualFileInfo.chunkCount
            val submitChunkNum = chunkInfo.chunkNum
            if (submitChunkNum >= realChunkCount || submitChunkNum < 0) {
                val msg = "文件分块错误, 服务器中记录文件块编号应为0~${realChunkCount - 1}, " +
                        "但提交的文件编号为$submitChunkNum"
                throw UploadTaskInvalidFormatException(msg)
            }

            // 如果这个编号的文件块已经处理过, 则跳过余下的流程
            if (actualFileInfo.progress[submitChunkNum]) return false to -1

            // 检查上传的文件的分块依据是否和预期一致
            val realChunkedSize = actualFileInfo.chunkedSize
            val submitChunkedSize = chunkInfo.chunkedSize
            if (submitChunkedSize != realChunkedSize) {
                val msg = "文件分块错误, 服务器中记录的文件分块大小是$realChunkCount, " +
                        "但提交的文件分块大小为$submitChunkedSize"
                throw UploadTaskInvalidFormatException(msg)
            }

            // 检查上传的文件块的实际大小是否和预期一致
            val filesize = actualFileInfo.filesize
            val submitChunkSize = chunkInfo.uploadFile.size()
            val realChunkSize = calculateRealChunkSize(filesize, realChunkedSize, submitChunkNum)
            if (realChunkSize != submitChunkSize) {
                val msg = "文件分块错误, 服务记录的文件大小为$filesize, 分块依据为$realChunkedSize, " +
                        "提交的文件块编号为$submitChunkNum, 此编号的文件块实际大小应为$realChunkSize, " +
                        "但提交的文件大小为$submitChunkSize"
                throw UploadTaskInvalidFormatException(msg)
            }

            return true to (submitChunkNum * realChunkedSize)
        }

        private fun calculateRealChunkSize(filesize: Long, chunkedSize: Long, chunkNum: Int): Long {
            if (chunkNum == 0) return min(filesize, chunkedSize)
            val rest = filesize - (chunkNum * chunkedSize)
            return min(rest, chunkedSize)
        }

        private fun calculateSuitableChunkedSize(filesize: Long): Pair<Long, Long> {
            val chunkedSize: Long = when {
                filesize < smallFileLimit -> defFileChunkSize
                filesize < mediumFileLimit -> midFileChunkSize
                filesize < bigFileLimit -> big1FileChunkSize
                else -> big2FileChunkSize
            }
            val chunkCount = ceil(filesize / chunkedSize.toDouble()).toLong()
            return chunkedSize to chunkCount
        }
    }

    object UploadTaskManager {

        @SyncMark val taskMapper = TreeMap<String, UploadTaskInfo>()

        suspend fun createNewTask(
            uploader: Int, relativePath: String, files: List<ActualUploadFileInfo>
        ): UploadTaskInfo {
            val taskId = generateTaskId(uploader, relativePath)
            return cacheAndStoreTask(uploader, taskId, relativePath, files)
        }

        fun createTaskFromDatabaseRecord(dbTask: SysTask) {
            val task = jsonToTask(dbTask.payload)
            task.dbId = dbTask.id
            task.hasChanged = false
            task.hasSaveChanged = true
            cacheTaskOnContext(task)
        }

        fun removeTaskInfoOnContext(task: UploadTaskInfo, exitState: Int) {
            cacheService.remove(getTaskCacheKey(task.taskId))
            taskMapper.remove(task.taskId)
            Sync.run {
                if (exitState == SysTask.FAILURE)
                    asyncCleanUpInvalidTask(task)
                TaskDAO.updateTaskState(task.dbId, exitState)
            }
        }

        fun updateSubTaskProgress(
            mainTask: UploadTaskInfo, subTask: ActualUploadFileInfo, chunkNum: Int
        ): Future<Unit> {
            return Sync.run {
                if (!subTask.done && !subTask.progress[chunkNum]) {
                    subTask.progress[chunkNum] = true
                    subTask.successCount++
                    if (subTask.successCount == subTask.chunkCount) {
                        val msg = "上传任务编号`${mainTask.taskId}`, 文件`${subTask.uploadFilename}`传输完成"
                        log.info(msg)
                        subTask.done = true
                        mainTask.successCount++
                        if (mainTask.successCount == mainTask.files.size) {
                            mainTask.done = true
                            systemService.gc()
                            log.info("上传编号`${mainTask.taskId}`全部任务传输完成")
                        }
                        asyncFinishSubTask(mainTask, subTask)
                    }
                    mainTask.hasChanged = true
                    mainTask.hasSaveChanged = false
                }
            }
        }

        fun asyncSaveProgressChanges(tasks: List<UploadTaskInfo>) = Async.run {
            TaskDAO.batchUpdatePayload(tasks.map { it.dbId to taskToJson(it) })
        }

        fun getTaskFromCache(taskId: String): Future<UploadTaskInfo?> {
            val cacheKey = getTaskCacheKey(taskId)
            return cacheService.get(cacheKey)
        }

        private fun getTaskCacheKey(taskId: String): String {
            return "upload:$taskId"
        }

        private fun asyncCleanUpInvalidTask(task: UploadTaskInfo) {
            for ((_, file) in task.files) {
                if (!file.done) {
                    fileViewService.asyncDeleteFileAndUpdateCap(task.uploadPath, file.tmpFilename)
                }
            }
        }

        private fun asyncFinishSubTask(mainTask: UploadTaskInfo, subTask: ActualUploadFileInfo) {
            val uploadPath = mainTask.uploadPath
            Async<Unit> { promise ->
                fileResolver.finishUploadTask(
                    uploadPath, subTask.tmpFilename
                ).onComplete { done ->
                    if (done.succeeded()) {
                        fileLogService.onFileUpload(mainTask.uploader, uploadPath, done.result())
                        promise.complete()
                    } else {
                        promise.fail(done.cause())
                    }
                }
            }
        }

        private suspend fun cacheAndStoreTask(
            uploader: Int, taskId: String, uploadPath: String, files: List<ActualUploadFileInfo>
        ): UploadTaskInfo {
            val task = UploadTaskInfo.create(uploader, taskId, uploadPath, files)
            FileChunkedUtil.handleFileChunked(task)
            tryStoreTaskToDatabase(uploader, task)
            cacheTaskOnContext(task).await()
            return task
        }

        private fun cacheTaskOnContext(task: UploadTaskInfo): Future<Void> {
            // 先同步更新treeMap, 然后异步等待更新缓存
            return Sync { promise ->
                putTaskOnContext(task)
                cacheService.putObject(
                    getTaskCacheKey(task.taskId), task,
                    UploadTaskCrontab.maxWaitTime
                ).onComplete { done ->
                    if (done.succeeded()) {
                        promise.complete()
                    } else {
                        promise.fail(done.cause())
                    }
                }
            }
        }

        private fun putTaskOnContext(task: UploadTaskInfo) {
            taskMapper[task.taskId] = task
        }

        private fun tryStoreTaskToDatabase(uid: Int, task: UploadTaskInfo) = try {
            val dbTask = SysTask {
                user = uid
                type = CrontabService.DatabaseCrontabHandler.UPLOAD_TASK
                payload = taskToJson(task)
                state = SysTask.RUNNING
            }
            TaskDAO.store(dbTask)
            task.dbId = dbTask.id
        } catch (e: Throwable) {
            throw e
        }

        private fun jsonToTask(json: String): UploadTaskInfo {
            return JsonObject(json).mapTo(UploadTaskInfo::class.java)
        }

        private fun taskToJson(task: UploadTaskInfo): String {
            return task.toJsonObject().toString()
        }

        private fun generateTaskId(uploader: Int, path: String): String {
            // 编码规则 >> 用户id:上传路径:当前时间的后8位:随机6位编码
            val randomCode = tokenService.generateRandomString(6)
            val timeSection = currentTimeSection()
            return "$uploader:$path:$timeSection:$randomCode"
        }

        private fun currentTimeSection(): String {
            val currentTime = currentTimeMillis()
            val shiftValue = (currentTime shr 8) shl 8
            return (currentTime - shiftValue).toString()
        }
    }

    /**
     * 文件上传定时任务
     *
     * 每 5 分钟检查一次上传任务列表, 当发现有任务上传完成或过期,
     * 则将任务的更新状态写入数据库, 并删除此任务在内存中的各种缓存对象
     */
    object UploadTaskCrontab : CrontabService.Crontab {

        // 定时任务属性
        override var valid = true
        override var delay = (1000 * 60 * 5L) // 5分钟
        override val name = "upload-task-service"
        override val order = CrontabService.Crontab.defOrder

        // 上传任务配置

        const val maxWaitTime = (1000 * 60 * 20L) // 20分钟: 最久等待20分钟, 如果一个任务20分钟无响应, 则系统自动回收这个任务

        override fun run() {
            val eliminateQueue = ArrayList<Pair<UploadTaskInfo, Int>>()
            val changes = ArrayList<UploadTaskInfo>()
            val currentTime = currentTimeMillis()

            for ((taskId, task) in UploadTaskManager.taskMapper) {
                if (task.done) {
                    eliminateQueue.add(task to SysTask.FINISH)
                    log.info("上传任务编号`$taskId`传输完成")
                    continue
                }
                if (currentTime - task.lastActionTime > maxWaitTime) {
                    eliminateQueue.add(task to SysTask.FAILURE)
                    log.warn("由于太长时间没有新的上传事件, 编号`$taskId`的上传任务取消")
                    continue
                }
                if (task.hasChanged && !task.hasSaveChanged) {
                    task.hasChanged = false
                    task.hasSaveChanged = true
                    changes.add(task)
                }
            }
            if (eliminateQueue.isNotEmpty()) {
                updateAndClearTask(eliminateQueue)
            }
            if (changes.isNotEmpty()) {
                saveChanges(changes)
            }
        }

        private fun updateAndClearTask(eliminateQueue: List<Pair<UploadTaskInfo, Int>>) {
            for ((task, state) in eliminateQueue) {
                UploadTaskManager.removeTaskInfoOnContext(task, state)
            }
            if (eliminateQueue.any { it.second == SysTask.FAILURE }) {
                systemService.gc()
            }
        }

        private fun saveChanges(tasks: List<UploadTaskInfo>) {
            UploadTaskManager.asyncSaveProgressChanges(tasks)
        }
    }
}